from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
import datetime
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")
from dagen.operators import SSHOperator

pt= '{{ {{ execution_date.in_tz('Asia/Shanghai').format('YYYYMMDD') }} }}'
#调度周期
schedule_interval="0 4 * * *"


dag = DAG(
**{
'dag_id': 'ads_cdp_user_profile_gen_v2',
'catchup': False,
'start_date': datetime.datetime(2021, month=10, day=13, tzinfo=local_tz),
'schedule_interval': schedule_interval,
'default_args':
    {
        'owner': 'Avris',
        'depends_on_past': False,
        'retries': 2,
        'email': ['guanghu@tesla.com'],
        'email_on_failure': True,
        'email_on_retry': False
    }
}
)

create_table='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name ads_cdp_user_profile_create'

create_table_task = SSHOperator(
    dag=dag,
    **{
        'task_id': 'create_table_task',
        'ssh_conn_id': 'eden_yarn_stg',
        'command': create_table
      }
)


dws2ads='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name ads_cdp_user_profile_overwrite --sql-params "--hivevar pt=%s"'%pt

ads_cdp_user_profile_gen_task = SSHOperator(
    dag=dag,
    **{
        'task_id': 'ads_cdp_user_profile_gen_task',
        'ssh_conn_id': 'eden_yarn_stg',
        'command': dws2ads
      }
)

create_table_task>>ads_cdp_user_profile_gen_task